/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "orc/Exceptions.hh"
#include "RLE.hh"
#include "Reader.hh"
#include "StripeStream.hh"

#include "wrap/coded-stream-wrapper.h"

namespace orc {

  StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader, uint64_t _index,
                                       const proto::StripeInformation& _stripeInfo,
                                       const proto::StripeFooter& _footer,
                                       uint64_t _stripeStart,
                                       InputStream& _input,
                                       const Timezone& _writerTimezone
                                       ): reader(_reader),
                                          stripeInfo(_stripeInfo),
                                          footer(_footer),
                                          stripeIndex(_index),
                                          stripeStart(_stripeStart),
                                          input(_input),
                                          writerTimezone(_writerTimezone) {
    // PASS
  }

  StripeStreamsImpl::~StripeStreamsImpl() {
    // PASS
  }

  StreamInformation::~StreamInformation() {
    // PASS
  }

  StripeInformation::~StripeInformation() {
    // PASS
  }


  StreamInformationImpl::~StreamInformationImpl() {
    // PASS
  }

  const std::vector<bool> StripeStreamsImpl::getSelectedColumns() const {
    return reader.getSelectedColumns();
  }

  proto::ColumnEncoding StripeStreamsImpl::getEncoding(uint64_t columnId
                                                       ) const {
    return footer.columns(static_cast<int>(columnId));
  }

  const Timezone& StripeStreamsImpl::getWriterTimezone() const {
    return writerTimezone;
  }

  std::ostream* StripeStreamsImpl::getErrorStream() const {
    return reader.getFileContents().errorStream;
  }

    std::unique_ptr<SeekableInputStream>
  StripeStreamsImpl::getStream(uint64_t columnId,
                               proto::Stream_Kind kind,
                               bool shouldStream) const {
    uint64_t offset = stripeStart;
    uint64_t dataEnd = stripeInfo.offset() + stripeInfo.indexlength() + stripeInfo.datalength();
    MemoryPool *pool = reader.getFileContents().pool;
    for(int i = 0; i < footer.streams_size(); ++i) {
      const proto::Stream& stream = footer.streams(i);
      if (stream.has_kind() &&
          stream.kind() == kind &&
          stream.column() == static_cast<uint64_t>(columnId)) {
        uint64_t streamLength = stream.length();
        uint64_t myBlock = shouldStream ? input.getNaturalReadSize(): streamLength;
        if (offset + streamLength > dataEnd) {
          std::stringstream msg;
          msg << "Malformed stream meta at stream index " << i << " in stripe " << stripeIndex
              << ": streamOffset=" << offset << ", streamLength=" << streamLength
              << ", stripeOffset=" << stripeInfo.offset() << ", stripeIndexLength="
              << stripeInfo.indexlength() << ", stripeDataLength=" << stripeInfo.datalength();
          throw ParseError(msg.str());
        }
        return createDecompressor(reader.getCompression(),
                                  std::unique_ptr<SeekableInputStream>
                                  (new SeekableFileInputStream
                                   (&input,
                                    offset,
                                    stream.length(),
                                    *pool,
                                    myBlock)),
                                  reader.getCompressionSize(),
                                  *pool);
      }
      offset += stream.length();
    }
    return std::unique_ptr<SeekableInputStream>();
  }

  MemoryPool& StripeStreamsImpl::getMemoryPool() const {
    return *reader.getFileContents().pool;
  }

  bool StripeStreamsImpl::getThrowOnHive11DecimalOverflow() const {
    return reader.getThrowOnHive11DecimalOverflow();
  }

  int32_t StripeStreamsImpl::getForcedScaleOnHive11Decimal() const {
    return reader.getForcedScaleOnHive11Decimal();
  }

  void StripeInformationImpl::ensureStripeFooterLoaded() const {
    if (stripeFooter.get() == nullptr) {
      std::unique_ptr<SeekableInputStream> pbStream =
        createDecompressor(compression,
                           std::unique_ptr<SeekableInputStream>
                             (new SeekableFileInputStream(stream,
                                                          offset +
                                                            indexLength +
                                                            dataLength,
                                                          footerLength,
                                                          memory)),
                           blockSize,
                           memory);
      stripeFooter.reset(new proto::StripeFooter());
      if (!stripeFooter->ParseFromZeroCopyStream(pbStream.get())) {
        throw ParseError("Failed to parse the stripe footer");
      }
    }
  }

  std::unique_ptr<StreamInformation>
     StripeInformationImpl::getStreamInformation(uint64_t streamId) const {
    ensureStripeFooterLoaded();
    uint64_t streamOffset = offset;
    for(uint64_t s=0; s < streamId; ++s) {
      streamOffset += stripeFooter->streams(static_cast<int>(s)).length();
    }
    return ORC_UNIQUE_PTR<StreamInformation>
      (new StreamInformationImpl(streamOffset,
                                 stripeFooter->
                                   streams(static_cast<int>(streamId))));
  }

}
